/*
一个reactor
*/
#include "eventloop.h"
#include "channel.h"
#include "epoll.h"
#include "log.h"
#include "util.h"
#include <functional>
#include <memory>
#include <mutex>
#include <vector>
#include <sys/eventfd.h>

EventLoop *t_loopInThisThread = nullptr;

int createEventFd(){
    int efd=eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    if (efd < 0) {
        record() << "Failed in eventfd";
        abort();
    }
    return efd;
}

EventLoop::EventLoop():m_isLoop(false),
                      m_poller(new Epoll()),
                      m_mtx(),
                      m_quit(false),
                      m_wakefd(createEventFd()),
                      m_channel(new Channel(this,m_wakefd)),
                      m_threadID(getpid())
                      {
    t_loopInThisThread = this;
    m_channel->setEvents(EPOLLIN);
    m_channel->setReadHandler(std::bind(&EventLoop::handleRead,this));
    m_channel->setConnHandler(std::bind(&EventLoop::handleConn,this));
    m_poller->epoll_add(m_channel, 0);
}

EventLoop::~EventLoop(){
    close(m_wakefd);
    t_loopInThisThread = nullptr;
}

//reactor启动
void EventLoop::loop(){
    if(m_isLoop){
        return;
    }
    //什么情景呢?,不在这个线程调用这个函数?
    if(!isInLoopThread()){
        return;
    }
    m_isLoop = true;
    m_quit = false;
    vector<std::shared_ptr<Channel>> ret;
    while(!m_quit){
        ret.clear();
        ret = m_poller->poll();
        for(auto &it:ret){
            it->handleEvents();
        }
        dePendingFunctors();
        m_poller->handleexpired();
    }
    m_isLoop = false;
}
//停止eventloop
void EventLoop::quit(){
    m_quit = true;
    //彻底停止前处理干净m_pendingFunctors
    if(!isInLoopThread()){
        wakeup();
    }
}
//关闭某个socket
void EventLoop::shutdown(std::shared_ptr<Channel> ev){
    shutDownWR(ev->getFd());
}

//m_wakefd被唤醒时调用
//看不出来有啥意义?
void EventLoop::handleRead(){
    uint64_t one = 1;
    int64_t n = readn(m_wakefd, &one, sizeof(one));
    if(n!=sizeof(one)){
        record()<< "EventLoop::handleRead() reads " << n << " bytes instead of 8";
    }
}
void EventLoop::handleConn(){

}


void EventLoop::remove_event(std::shared_ptr<Channel> ev){
    m_poller->epoll_del(ev);
}
void EventLoop::add_event(std::shared_ptr<Channel> ev ,int timeout){
    m_poller->epoll_add(ev,timeout);
}
void EventLoop::update_event(shared_ptr<Channel> ev,int timeout){
    m_poller->epoll_mod(ev,timeout);
}

//保证每一个线程只有一个eventloop对象
bool EventLoop::isInLoopThread(){
    return m_threadID == getpid();
}

void EventLoop::queueInloop(Functor&& cb){
    {
        std::unique_lock<std::mutex> lock(m_mtx);
        m_pendingFunctors.emplace_back(move(cb));
    }
    if(!isInLoopThread()){
        wakeup();
    }
}
//处理m_pendingFunctors
void EventLoop::dePendingFunctors(){
    vector<Functor> functors;
    {
        std::unique_lock<std::mutex> lock(m_mtx);
        functors.swap(m_pendingFunctors);
    }
    for(size_t i=0;i<functors.size();++i){
        functors[i]();
    }
}

void EventLoop::wakeup(){
    uint64_t one = 1;
    ssize_t n = writen(m_wakefd,(char*)(&one),sizeof(one));
    if(n!= sizeof(one)){
        record()<< "EventLoop::wakeup() writes " << n << " bytes instead of 8";
    }
}